{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Feature Transformation with a Amazon SageMaker Processing Job and SparkML\n",
    "\n",
    "Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge.\n",
    "\n",
    "Often, distributed data processing frameworks such as Spark are used to pre-process data sets in order to prepare them for training. In this notebook we'll use Amazon SageMaker Processing, and leverage the power of Spark in a managed SageMaker environment to run our processing workload."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![](img/prepare_dataset.png)\n",
    "\n",
    "![](img/processing.jpg)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Contents\n",
    "\n",
    "1. Setup\n",
    "1. Use Amazon SageMaker Processing Job to execute a Spark ML Transformation\n",
    "1. Setup Input Data\n",
    "1. Setup Output Data\n",
    "1. Build a Spark container for running the processing job\n",
    "1. Run the Processing Job using Amazon SageMaker\n",
    "1. Inspect the Processed Output Dta"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup\n",
    "\n",
    "Add the following policies to your SageMaker Execution Role:  \n",
    "* `EC2ContainerRegistry`\n",
    "* Permissions: `List`, `Read`, `Write` \n",
    "* Repository:  `amazon-reviews-spark-processor`\n",
    "\n",
    "Let's start by specifying:\n",
    "* The S3 bucket and prefixes that you use for training and model data. Use the default bucket specified by the Amazon SageMaker session.\n",
    "* The IAM role ARN used to give processing and training access to the dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sagemaker\n",
    "import boto3\n",
    "\n",
    "sagemaker_session = sagemaker.Session()\n",
    "role = sagemaker.get_execution_role()\n",
    "bucket = sagemaker_session.default_bucket()\n",
    "region = boto3.Session().region_name"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Using Amazon SageMaker Processing Job to Run a Spark ML Transformation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get the name of previous Scikit Processing Job to use to construct the input for our Spark ML Processing Job."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Input Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r scikit_processing_job_s3_output_prefix"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print('Previous Scikit Processing Job Name: {}'.format(scikit_processing_job_s3_output_prefix))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls s3://$bucket/$scikit_processing_job_name/output/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "balanced_train_data_input = 's3://{}/{}/output/raw-labeled-split-balanced-header-train/'.format(bucket, scikit_processing_job_s3_output_prefix)\n",
    "balanced_validation_data_input = 's3://{}/{}/output/raw-labeled-split-balanced-header-validation/'.format(bucket, scikit_processing_job_s3_output_prefix)\n",
    "balanced_test_data_input = 's3://{}/{}/output/raw-labeled-split-balanced-header-test/'.format(bucket, scikit_processing_job_s3_output_prefix)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $balanced_train_data_input"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $balanced_validation_data_input"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $balanced_test_data_input"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Output Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from time import gmtime, strftime\n",
    "timestamp_prefix = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n",
    "\n",
    "output_prefix = 'amazon-reviews-spark-processor-{}'.format(timestamp_prefix)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "balanced_train_data_tfidf_output = 's3://{}/{}/output/tfidf-labeled-split-balanced-noheader-train'.format(bucket, output_prefix)\n",
    "balanced_validation_data_tfidf_output = 's3://{}/{}/output/tfidf-labeled-split-balanced-noheader-validation'.format(bucket, output_prefix)\n",
    "balanced_test_data_tfidf_output = 's3://{}/{}/output/tfidf-labeled-split-balanced-noheader-test'.format(bucket, output_prefix)\n",
    "\n",
    "print(balanced_train_data_tfidf_output)\n",
    "print(balanced_validation_data_tfidf_output)\n",
    "print(balanced_test_data_tfidf_output)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Build a Spark Docker Image to Run the Processing Job\n",
    "\n",
    "An example Spark container is included in the `./container` directory of this example. The container handles the bootstrapping of all Spark configuration, and serves as a wrapper around the `spark-submit` CLI. At a high level the container provides:\n",
    "* A set of default Spark/YARN/Hadoop configurations\n",
    "* A bootstrapping script for configuring and starting up Spark master/worker nodes\n",
    "* A wrapper around the `spark-submit` CLI to submit a Spark application\n",
    "\n",
    "\n",
    "After the container build and push process is complete, use the Amazon SageMaker Python SDK to submit a managed, distributed Spark application that performs our dataset processing.\n",
    "\n",
    "Build the example Spark container."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "docker_repo = 'amazon-reviews-spark-processor'\n",
    "docker_tag = 'latest'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!docker build -t $docker_repo:$docker_tag -f container/Dockerfile ./container"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create an Amazon Elastic Container Registry (Amazon ECR) repository for the Spark container and push the image."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "account_id = boto3.client('sts').get_caller_identity().get('Account')\n",
    "region = boto3.session.Session().region_name\n",
    "\n",
    "image_uri = '{}.dkr.ecr.{}.amazonaws.com/{}:{}'.format(account_id, region, docker_repo, docker_tag)\n",
    "print(image_uri)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create ECR repository and push docker image"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws ecr describe-repositories --repository-names $docker_repo || aws ecr create-repository --repository-name $docker_repo"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!docker tag $docker_repo:$docker_tag $image_uri"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!docker push $image_uri"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Run the Job using Amazon SageMaker Processing Jobs\n",
    "\n",
    "Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the Spark container that was just built, and a SparkML script for processing in the job configuration."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Review the Spark processing script."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cat preprocess-spark-text-to-tfidf-depends-on-scikit.py"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.processing import ScriptProcessor\n",
    "\n",
    "processor = ScriptProcessor(base_job_name='spark-amazon-reviews-processor',\n",
    "                            image_uri=image_uri,\n",
    "                            command=['/opt/program/submit'],\n",
    "                            role=role,\n",
    "                            instance_count=2, # instance_count needs to be > 1 or you will see the following error:  \"INFO yarn.Client: Application report for application_ (state: ACCEPTED)\"\n",
    "                            instance_type='ml.r5.8xlarge',\n",
    "                            env={'mode': 'python'})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Start the Spark Processing Job\n",
    "\n",
    "_Notes on Invoking from Lambda:_\n",
    "* However, if we use the boto3 SDK (ie. with a Lambda), we need to copy the `preprocess-text-to-tfidf.py` file to S3 and specify the everything include --py-files, etc.\n",
    "* We would need to do the following before invoking the Lambda:\n",
    "     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/code/preprocess.py\n",
    "     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/py_files/preprocess.py\n",
    "* Then reference the s3://<location> above in the --py-files, etc.\n",
    "* See Lambda example code in this same project for more details.\n",
    "\n",
    "_Notes on not using ProcessingInput and Output:_\n",
    "* Since Spark natively reads/writes from/to S3 using s3a://, we can avoid the copy required by ProcessingInput and ProcessingOutput (FullyReplicated or ShardedByS3Key) and just specify the S3 input and output buckets/prefixes._\"\n",
    "* See https://github.com/awslabs/amazon-sagemaker-examples/issues/994 for issues related to using /opt/ml/processing/input/ and output/\n",
    "* If we use ProcessingInput, the data will be copied to each node (which we don't want in this case since Spark already handles this)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.processing import ProcessingOutput\n",
    "\n",
    "processor.run(code='preprocess-spark-text-to-tfidf-depends-on-scikit.py',\n",
    "              arguments=['s3_input_balanced_train_data', balanced_train_data_input,\n",
    "                         's3_input_balanced_validation_data', balanced_validation_data_input,\n",
    "                         's3_input_balanced_test_data', balanced_test_data_input,\n",
    "                         's3_output_balanced_train_data', balanced_train_data_tfidf_output,\n",
    "                         's3_output_balanced_validation_data', balanced_validation_data_tfidf_output,\n",
    "                         's3_output_balanced_test_data', balanced_test_data_tfidf_output,                         \n",
    "              ],\n",
    "              # We need this dummy output to allow us to call \n",
    "              #    ProcessingJob.from_processing_name() later \n",
    "              #    to describe the job and poll for Completed status\n",
    "              outputs=[\n",
    "                       ProcessingOutput(s3_upload_mode='EndOfJob',\n",
    "                                        output_name='dummy-output',\n",
    "                                        source='/opt/ml/processing/output')\n",
    "              ],          \n",
    "              logs=True,\n",
    "              wait=False\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "spark_processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']\n",
    "\n",
    "display(HTML('<b>Review <a href=\"https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix\">CloudWatch Logs</a> After About 5 Minutes</b>'.format(region, spark_processing_job_name)))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "# This is different than the job name because we are not using ProcessingOutput's in this Spark ML case.\n",
    "spark_processing_job_s3_output_prefix = output_prefix\n",
    "\n",
    "display(HTML('<b>Review <a href=\"https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview\">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(bucket, spark_processing_job_s3_output_prefix, region)))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Please Wait Until the Processing Job Completes\n",
    "Re-run this next cell until the job status shows `Completed`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=spark_processing_job_name,\n",
    "                                                                            sagemaker_session=sagemaker_session)\n",
    "\n",
    "processing_job_description = running_processor.describe()\n",
    "\n",
    "processing_job_status = processing_job_description['ProcessingJobStatus']\n",
    "print('\\n')\n",
    "print(processing_job_status)\n",
    "print('\\n')\n",
    "\n",
    "print(processing_job_description)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Inspect the Processed Output Dataset\n",
    "\n",
    "\n",
    "## The next cells will not work properly until the job completes above.\n",
    "\n",
    "\n",
    "Take a look at a few rows of the transformed dataset to make sure the processing was successful."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "!aws s3 ls --recursive $balanced_train_data_tfidf_output/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls --recursive $balanced_validation_data_tfidf_output/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "!aws s3 ls --recursive $balanced_test_data_tfidf_output/"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pass `spark_processing_job_s3_output_prefix` as input to the next notebook"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(spark_processing_job_s3_output_prefix)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store spark_processing_job_s3_output_prefix\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
